package drds.plus.executor.command_handler.query;

import drds.plus.executor.ExecuteContext;
import drds.plus.executor.command_handler.TableWithIndexMetaData;
import drds.plus.executor.cursor.cursor.IAffectRowCursor;
import drds.plus.executor.cursor.cursor.IMergeCursor;
import drds.plus.executor.cursor.cursor.ISortingCursor;
import drds.plus.executor.cursor.cursor.impl.merge.ConcurrentMergeCursor;
import drds.plus.executor.cursor.cursor.impl.sort.DistinctCursor;
import drds.plus.executor.cursor.cursor_metadata.CursorMetaData;
import drds.plus.executor.data_node_executor.DataNodeExecutorContext;
import drds.plus.executor.repository.Repository;
import drds.plus.executor.row_values.RowValues;
import drds.plus.executor.utils.Utils;
import drds.plus.sql_process.abstract_syntax_tree.ObjectCreateFactory;
import drds.plus.sql_process.abstract_syntax_tree.configuration.ColumnMetaData;
import drds.plus.sql_process.abstract_syntax_tree.execute_plan.ExecutePlan;
import drds.plus.sql_process.abstract_syntax_tree.execute_plan.dml.IPut;
import drds.plus.sql_process.abstract_syntax_tree.execute_plan.query.MergeQuery;
import drds.plus.sql_process.abstract_syntax_tree.execute_plan.query.Query;
import drds.plus.sql_process.abstract_syntax_tree.execute_plan.query.QueryConcurrencyWay;
import drds.plus.sql_process.abstract_syntax_tree.execute_plan.query.QueryWithIndex;
import drds.plus.sql_process.abstract_syntax_tree.expression.item.Item;
import drds.plus.sql_process.abstract_syntax_tree.expression.item.function.Function;
import drds.plus.sql_process.abstract_syntax_tree.expression.order_by.OrderBy;
import drds.tools.$;
import lombok.extern.slf4j.Slf4j;
import org.slf4j.Logger;

import java.util.*;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;

/**
 * 并行控制算法
 */
@Slf4j
public class MergeQueryHandler extends AbstractQueryHandler {
    public Logger log() {
        return log;
    }

    public MergeQueryHandler() {
        super();
    }


    protected ISortingCursor doQuery(ExecuteContext executeContext, ExecutePlan executePlan, ISortingCursor cursor) throws RuntimeException {
        // sort，多个相同schema cursor的合并操作。
        MergeQuery mergeQuery = (MergeQuery) executePlan;
        Repository repository = executeContext.getRepository();
        List<ExecutePlan> executePlanList = mergeQuery.getExecutePlanList();
        List<ISortingCursor> cursorList = new ArrayList<ISortingCursor>();

        if (!mergeQuery.isSharded()) {
            /*
             * 如果是个需要左驱动表的结果来进行查询的查询，直接返回mergeCursor.
             * 有些查询，是需要依赖左值结果进行查询的。这类查询需要先取一批左值出来，根据这些左值，走规则算右值的。 这时候只有一个subNodes
             */
            if (executePlanList.size() != 1) {
                throw new IllegalArgumentException("executePlanList is not 1? may be 执行计划生育上有了问题了，查一下" + executePlan);
            }
            TableWithIndexMetaData tableWithIndexMetaData = new TableWithIndexMetaData();
            QueryWithIndex queryWithIndex = (QueryWithIndex) executePlanList.get(0);
            prepareTableAndIndexMetaDataForQuery(executeContext, queryWithIndex, tableWithIndexMetaData);

            CursorMetaData cursorMetaData = Utils.convertIndexMetaDataToCursorMetaData(queryWithIndex);

            ColumnMetaData[] keyColumnMetaDatas = new ColumnMetaData[]{};

            List<OrderBy> orderByList = new LinkedList<OrderBy>();
            for (ColumnMetaData columnMetaData : keyColumnMetaDatas) {
                OrderBy orderBy = ObjectCreateFactory.createOrderBy();
                orderBy.setColumn(Utils.getIColumnsFromColumnMetaData(columnMetaData, queryWithIndex.getAlias()));
                orderByList.add(orderBy);
            }
            cursor = repository.getCursorFactory().mergeCursor(executeContext, mergeQuery, cursorList, cursorMetaData, orderByList);
            return cursor;
        } else {

            if (mergeQuery.isBroadcastPut()) {
                executeBroadCast(executeContext, executePlanList, cursorList);//只要一个成功即可
            } else if (QueryConcurrencyWay.concurrent == mergeQuery.getQueryConcurrencyWay() && executeContext.isAutoCommit()) {
                executeConcurrent(executeContext, executePlanList, cursorList);
            } else if (QueryConcurrencyWay.data_node_concurrent == mergeQuery.getQueryConcurrencyWay() && executeContext.isAutoCommit()) {
                executeGroupConcurrent(executeContext, executePlanList, cursorList);
            } else {
                execute(executeContext, executePlanList, cursorList);//串行
            }
        }
        if (executePlanList.get(0) instanceof IPut) {// 合并affect_rows
            int affectRowSum = 0;
            boolean first = true;
            List<RuntimeException> exs = new ArrayList();
            for (ISortingCursor affectRowCursor : cursorList) {
                if (!mergeQuery.isDistinctGroupByShardColumns() || first) {
                    RowValues rowData = null;
                    while ((rowData = affectRowCursor.next()) != null) {
                        Integer affectRow = rowData.getInteger(0);
                        if (affectRow != null) {
                            affectRowSum += affectRow;
                        }
                    }
                }
                first = false;
                exs = affectRowCursor.close(exs);
            }
            if (!exs.isEmpty())
                throw exs.get(0);
            IAffectRowCursor affectRow = repository.getCursorFactory().affectRowCursor(executeContext, affectRowSum);
            return affectRow;
        } else {
            // union的话要去重
            // 这里假设都是排好序的
            if (mergeQuery.isUnion()) {
                cursor = this.buildMergeSortCursor(executeContext, repository, cursorList, false);
            } else {
                cursor = repository.getCursorFactory().mergeCursor(executeContext, mergeQuery, cursorList);
            }
        }
        return cursor;
    }

    private void executeBroadCast(ExecuteContext executeContext, List<ExecutePlan> executePlanList, List<ISortingCursor> subCursors) throws RuntimeException {

        // 第一个节点当成主库
        // 主库成功则成功，其余的库失败不报错
        ExecutePlan firstExecutePlan = executePlanList.get(0);

        ISortingCursor schematicCursor = DataNodeExecutorContext.getExecutorContext().getDataNodeExecutor().execute(executeContext, firstExecutePlan);

        subCursors.add(schematicCursor);

        if (executePlanList.size() > 1) {
            List<ExecutePlan> otherExecutePlan = new ArrayList(executePlanList.size() - 1);
            for (int i = 1; i < executePlanList.size(); i++) {
                otherExecutePlan.add(executePlanList.get(i));
            }
            try {
                executeConcurrent(executeContext, otherExecutePlan, subCursors);
            } catch (Exception ex) {
                log.warn("broadcast has failed on some datanode, chars is: " + executeContext.getSql(), ex);
            }
        }

    }


    private void executeConcurrent(ExecuteContext executeContext, List<ExecutePlan> executePlanList, List<ISortingCursor> subCursors) throws RuntimeException {

        List<Future<List<ISortingCursor>>> cursorListFutureList = new LinkedList<Future<List<ISortingCursor>>>();

        /**
         * 将执行计划按group分组，组间并行执行，组内串行执行
         */
        Map<String, List<ExecutePlan>> dataNodeIdToExecutePlanListMap = new HashMap<String, List<ExecutePlan>>();
        for (ExecutePlan executePlan : executePlanList) {
            String dataNodeId = executePlan.getDataNodeId();
            List<ExecutePlan> $executePlanList = dataNodeIdToExecutePlanListMap.get(dataNodeId);
            if ($executePlanList == null) {
                $executePlanList = new LinkedList();
                dataNodeIdToExecutePlanListMap.put(dataNodeId, $executePlanList);
            }
            $executePlanList.add(executePlan);
        }

        for (List<ExecutePlan> $executePlanList : dataNodeIdToExecutePlanListMap.values()) {
            Future<List<ISortingCursor>> cursorListFuture = DataNodeExecutorContext.getExecutorContext().getDataNodeExecutor().executeWithFuture(executeContext, $executePlanList);
            cursorListFutureList.add(cursorListFuture);
        }
        List<RuntimeException> exs = new ArrayList();
        for (Future<List<ISortingCursor>> future : cursorListFutureList) {
            try {
                subCursors.addAll(future.get(15, TimeUnit.MINUTES));
            } catch (Throwable e) {
                exs.add(new RuntimeException(e));
            }
        }

        if ($.isNotNullAndHasElement(exs)) {
            throw exs.get(0);
        }
    }

    private void executeGroupConcurrent(ExecuteContext executeContext, List<ExecutePlan> executePlanList, List<ISortingCursor> subCursors) throws RuntimeException {
        /**
         * group内串行，group间并行
         *
         * <pre>
         *             group1    group2   group3
         * ---------------------------------------
         * cursor0      t10       t20      t30
         * ---------------------------------------
         * cursor1      t11       t21      t31
         * ---------------------------------------
         * cursor2      t12       t22      t32
         * ---------------------------------------
         *
         *
         * </pre>
         */

        /**
         * 将执行计划按group分组，组间并行执行，组内串行执行
         */
        Map<String, List<ExecutePlan>> dataNodeIdToExecutePlanListMap = new HashMap<String, List<ExecutePlan>>();
        for (ExecutePlan executePlan : executePlanList) {
            String dataNodeId = executePlan.getDataNodeId();
            List<ExecutePlan> $executePlanList = dataNodeIdToExecutePlanListMap.get(dataNodeId);
            if ($executePlanList == null) {
                $executePlanList = new LinkedList<ExecutePlan>();
                dataNodeIdToExecutePlanListMap.put(dataNodeId, $executePlanList);
            }
            $executePlanList.add(executePlan);
        }
        int index = 0;
        while (true) {
            List<ExecutePlan> oneGroupConcurrentExecutePlanList = new ArrayList<ExecutePlan>();
            for (List<ExecutePlan> $executePlanList : dataNodeIdToExecutePlanListMap.values()) {
                if (index >= $executePlanList.size()) {//如果为null则不参与并行计算
                    continue;
                }
                oneGroupConcurrentExecutePlanList.add($executePlanList.get(index));
            }
            if (oneGroupConcurrentExecutePlanList.isEmpty()) {//都为空的时候进行终止
                break;
            }
            ConcurrentMergeCursor concurrentMergeCursor = new ConcurrentMergeCursor(executeContext, oneGroupConcurrentExecutePlanList);
            subCursors.add(concurrentMergeCursor);
            if (index == 0) {
                concurrentMergeCursor.init();
            }
            index++;

        }

    }

    private void execute(ExecuteContext executeContext, List<ExecutePlan> executePlanList, List<ISortingCursor> cursorList) throws RuntimeException {
        for (ExecutePlan executePlan : executePlanList) {
            ISortingCursor cursor = DataNodeExecutorContext.getExecutorContext().getDataNodeExecutor().execute(executeContext, executePlan);
            cursorList.add(cursor);
        }
    }

    protected ISortingCursor executeAggregate(Repository repository, ExecuteContext executeContext, ExecutePlan executePlan, ISortingCursor cursor, boolean closeResultCursor, List<OrderBy> groupByList, List<Function> aggregateFunctionList) throws RuntimeException {
        return this.executeMergeAgg(repository, executeContext, executePlan, cursor, groupByList);
    }

    /**
     * 先进行合并，然后进行aggregats
     */
    private ISortingCursor executeMergeAgg(Repository repository, ExecuteContext executeContext, ExecutePlan executePlan, ISortingCursor sortingCursor, List<OrderBy> groupByList) throws RuntimeException {
        List itemList = ((Query) executePlan).getItemList();
        if (itemList != null) {
            List<Function> aggregateFunctionListNeedToCalculate = getAggregateFunctionListNeedToCalculate(itemList, true);

            if (((MergeQuery) executePlan).isDistinctGroupByShardColumns()) {

                // distinct routes columnNameList
                // do nothing
            } else {
                for (Function function : aggregateFunctionListNeedToCalculate) {

                    if (function.isNeedDistinctArg()) {
                        Query query = (Query) ((MergeQuery) executePlan).getExecutePlanList().get(0);

                        // 这时候的order by是sub对外的order by，要做好别名替换
                        List<Item> itemList1 = Utils.copyItemList(query.getItemList());
                        for (Item item : itemList1) {
                            item.setTableName(query.getAlias());
                            if (item.getAlias() != null) {
                                item.setColumnName(item.getAlias());
                                item.setAlias(null);
                            }
                        }

                        // distinct 列的顺序可以随意
                        sortingCursor = this.processOrderBy(executeContext, sortingCursor, (Query) executePlan, getOrderByList(itemList1), false);
                        sortingCursor = new DistinctCursor(sortingCursor, sortingCursor.getOrderByList());
                        break;
                    }
                }
            }

            if ((aggregateFunctionListNeedToCalculate != null && !aggregateFunctionListNeedToCalculate.isEmpty()) || (groupByList != null && !groupByList.isEmpty())) {
                sortingCursor = repository.getCursorFactory().aggregateCursor(executeContext, sortingCursor, aggregateFunctionListNeedToCalculate, groupByList, true);
            }
        }
        return sortingCursor;
    }


    protected ISortingCursor processOrderBy(ExecuteContext executeContext, ISortingCursor cursor, Query query, List<OrderBy> orderByListInRequest, boolean needOrderMatch) throws RuntimeException {
        Repository repository = executeContext.getRepository();
        boolean hasOrderBy = orderByListInRequest != null && !orderByListInRequest.isEmpty();
        if (!hasOrderBy) {
            return cursor;
        }
        if (cursor instanceof IMergeCursor) {
            IMergeCursor mergeCursor = (IMergeCursor) cursor;
            List<ISortingCursor> cursorList = mergeCursor.getSortingCursorList();
            /**
             * <pre>
             * 所有子节点，如果都是顺序，则可以直接使用mergeSort进行合并排序。
             * 如果都是逆序，则不符合预期，用临时表（因为优化器应该做优化，尽可能将子cursor的顺序先变成正续，这里不会出现这种情况）
             * 如果有正有逆序，使用临时表其他情况，使用临时表
             * </pre>
             */
            OrderByStrategy $orderByStrategy = null;
            for (ISortingCursor cursor1 : cursorList) {
                OrderByStrategy orderByStrategy = chooseOrderByStrategy(cursor1, orderByListInRequest, needOrderMatch);
                if ($orderByStrategy == null) {
                    $orderByStrategy = orderByStrategy;
                }
                if (orderByStrategy != OrderByStrategy.normal) {//如果不是normal则都是temporaryTable
                    $orderByStrategy = OrderByStrategy.temporaryTable;
                }
                break;//只取其中一个即可
            }

            MergeQuery mergeQuery = (MergeQuery) query;
            if ($orderByStrategy == OrderByStrategy.normal) {
                if (!(mergeQuery.getQueryConcurrencyWay() == QueryConcurrencyWay.concurrent)) {//串行或者节点内串行节点间并行 则使用临时表存储大量的临时数据
                    $orderByStrategy = OrderByStrategy.temporaryTable;
                }
            }
            if ($orderByStrategy == OrderByStrategy.normal) {// 正常的合并.
                // 不去重的
                cursor = buildMergeSortCursor(executeContext, repository, cursorList, true);
            } else if ($orderByStrategy == OrderByStrategy.temporaryTable || $orderByStrategy == OrderByStrategy.reverseCursor) {
                cursor = repository.getCursorFactory().temporaryTableSortCursor(executeContext, cursor, orderByListInRequest, true, query.getRequestId());
            } else {
                throw new IllegalArgumentException("shoult not be here:" + $orderByStrategy);
            }
        } else {
            return super.processOrderBy(executeContext, cursor, query, orderByListInRequest, needOrderMatch);
        }
        return cursor;
    }

    private ISortingCursor buildMergeSortCursor(ExecuteContext executeContext, Repository repository, List<ISortingCursor> cursorList, boolean duplicated) throws RuntimeException {
        ISortingCursor cursor = repository.getCursorFactory().mergeSortedCursor(executeContext, cursorList, duplicated);
        return cursor;
    }

    /**
     * <pre>
     * group by和aggregate FunctionImpl。
     * 对单机来说，原则就是尽可能的使用索引完成count max min的功能。
     * 参考的关键条件有：
     * 1. 是否需要group by
     * 2. 是什么aggregate.
     * 3. 是否需要distinct
     * 4. 是否是merge节点
     * </pre>
     */

    protected ISortingCursor processGroupByAndAggregateFunction(ExecuteContext executeContext, Query query, ISortingCursor cursor) throws RuntimeException {

        MergeQuery mergeQuery = (MergeQuery) query;
        // 是否带有group by 列。。
        List<OrderBy> groupByList = mergeQuery.getGroupByList();
        boolean closeResultCursor = executeContext.isCloseResultCursor();
        final Repository repository = executeContext.getRepository();

        List retColumns = getItemList(mergeQuery);
        List<Function> _agg = getAggregateFunctionListNeedToCalculate(retColumns, true);
        // 接着处理group by
        if (groupByList != null && !groupByList.isEmpty()) {

            if (mergeQuery.isGroupByShardColumns()) {

                // group by sharding column
                // do nothing
            } else {
                // group by之前需要进行排序，按照group by列排序
                cursor = processOrderBy(executeContext, cursor, mergeQuery, (groupByList), false);
            }
        }

        cursor = executeAggregate(repository, executeContext, mergeQuery, cursor, closeResultCursor, groupByList, _agg);
        return cursor;
    }
}
